Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support batch read iceberg source #15214

Merged
merged 30 commits into from
Feb 27, 2024

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented Feb 23, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

create source xs 
(seq_id bigint, user_id bigint, user_name string)
with (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.access.key = 'minioadmin',
    s3.secret.key = 'minioadmin',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='demo_table'
);

select * from xs;

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

  • Support create iceberg source.
  • Support batch read iceberg source.
  • Currently, only catalog.type = 'storage' is supported in iceberg source

Example:

create source xs 
(seq_id bigint, user_id bigint, user_name string)
with (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.access.key = 'minioadmin',
    s3.secret.key = 'minioadmin',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='demo_table'
);

select * from xs;

Copy link

gitguardian bot commented Feb 23, 2024

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password 11fdfcd ci/scripts/regress-test.sh View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Our GitHub checks need improvements? Share your feedbacks!

@chenzl25 chenzl25 added the user-facing-changes Contains changes that are visible to users label Feb 23, 2024
@chenzl25
Copy link
Contributor Author

chenzl25 commented Feb 26, 2024

Cluster Info:
Compute Nodes: 4CPU 16GB * 2

Case 1

CREATE source data_gen_source (seq_id bigint, user_id bigint,
  user_name varchar)
WITH (
     connector = 'datagen',
     fields.seq_id.kind = 'sequence',
     fields.seq_id.start = '1',
     fields.seq_id.end = '100000000',
     fields.user_id.kind = 'random',
     fields.user_id.min = '1',
     fields.user_id.max = '10000000',
     fields.user_name.kind = 'random',
     fields.user_name.length = '10',
     datagen.rows.per.second = '500000'
 ) ROW FORMAT JSON;

create sink sink_iceberg from data_gen_source
with (
    connector = 'iceberg',
    type='append-only',
    force_append_only = 'true',
    catalog.type = 'storage',
    warehouse.path = 's3a://dylan-test-iceberg/',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='demo_table'
)

-- Wait until enough data

After compaction, the iceberg table demo_table has 9 parquet files total of 3GB of data. Total rows about 231,331,776.

insert select

create source xs
( seq_id bigint, user_id bigint, user_name string )
with (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3a://dylan-test-iceberg/',
    s3.endpoint = 'https://s3.us-east-1.amazonaws.com',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='demo_table'
);

create table t (seq_id bigint, user_id bigint, user_name varchar);

set batch_enable_distributed_dml = true;
insert into t select * from xs;

INSERT 0 231331776
Time: 183s

about 1.3million rows/s 
about 16MB/s

select

dev=> select count(*) from xs;
   count
-----------
 231331776
(1 row)

Time: 25s

about 9million rows/s
about 122MB/s

Case 2

CREATE source data_gen_source_for_big_table (seq_id bigint, user_id bigint,
  user_name varchar, big_column1 varchar, big_column2 varchar, big_column3 varchar)
WITH (
     connector = 'datagen',
     fields.seq_id.kind = 'sequence',
     fields.seq_id.start = '1',
     fields.seq_id.end = '10000000000',
     fields.user_id.kind = 'random',
     fields.user_id.min = '1',
     fields.user_id.max = '10000000000',
     fields.user_name.kind = 'random',
     fields.user_name.length = '10',
     fields.big_column1.kind = 'random',
     fields.big_column1.length = '256',
     fields.big_column2.kind = 'random',
     fields.big_column2.length = '512',
     fields.big_column3.kind = 'random',
     fields.big_column3.length = '1024',
     datagen.rows.per.second = '1000000'
 ) ROW FORMAT JSON;

create sink sink_iceberg from data_gen_source_for_big_table
with (
    connector = 'iceberg',
    type='append-only',
    force_append_only = 'true',
    catalog.type = 'storage',
    warehouse.path = 's3a://dylan-test-iceberg/',
    s3.endpoint = 'https://s3.us-east-1.amazonaws.com',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='big_table'
);

After compaction, the iceberg table big_table has 32 parquet files total of 30GB of data. Total rows about 21,077,248.

insert select

create source big_table_source
( seq_id bigint, user_id bigint, user_name string, big_column1 varchar , big_column2 varchar , big_column3 varchar )
with (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3a://dylan-test-iceberg/',
    s3.endpoint = 'https://s3.us-east-1.amazonaws.com',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='big_table'
);

create table t ( seq_id bigint, user_id bigint, user_name string, big_column1 varchar , big_column2 varchar , big_column3 varchar );

set batch_enable_distributed_dml = true;
insert into t select * from big_table_source;

INSERT 0 21077248
Time: 172s

about 150k rows/s
about 178MB/s

select

dev=> select count(*) from big_table_source;
  count
----------
 21077248
(1 row)

Time: 123603.571 ms (02:03.604)

about 170k rows/s
about 250MB/s

@chenzl25 chenzl25 requested a review from fuyufjh February 26, 2024 07:03
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest LGTM.

src/connector/src/source/iceberg/mod.rs Show resolved Hide resolved
src/connector/src/parser/mod.rs Show resolved Hide resolved
src/batch/src/executor/source.rs Outdated Show resolved Hide resolved
@fuyufjh
Copy link
Member

fuyufjh commented Feb 26, 2024

Additionally,

Iceberg must be a COW table or contain no deleted files

Is it possible to check and ensure this?

@chenzl25
Copy link
Contributor Author

Additionally,

Iceberg must be a COW table or contain no deleted files

Is it possible to check and ensure this?

The table properties set by e.g. sparks could be selected by RisingWave(icelake). It seems not just a hint for a specific connector or sdk. cc @liurenjie1024

image

src/frontend/src/scheduler/plan_fragmenter.rs Show resolved Hide resolved
src/connector/src/source/iceberg/mod.rs Show resolved Hide resolved
src/connector/src/source/iceberg/mod.rs Outdated Show resolved Hide resolved
src/connector/src/source/iceberg/mod.rs Outdated Show resolved Hide resolved
@liurenjie1024
Copy link
Contributor

How about we merge this after #14885 ?

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@chenzl25
Copy link
Contributor Author

How about we merge this after #14885 ?

Both are okay to me. Conflicts might be easy to be resolved.

@liurenjie1024
Copy link
Contributor

How about we merge this after #14885 ?

Both are okay to me. Conflicts might be easy to be resolved.

We may need to change some tests.

@chenzl25 chenzl25 enabled auto-merge February 27, 2024 08:12
@chenzl25 chenzl25 added this pull request to the merge queue Feb 27, 2024
Merged via the queue into main with commit 0b9d5af Feb 27, 2024
25 of 27 checks passed
@chenzl25 chenzl25 deleted the dylan/support_batch_read_iceberg_source branch February 27, 2024 09:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants